feat: Execution Monitor View with real-time SSE streaming (#332)#339
Conversation
#332) Implements Steps 1-12 of the execution monitor plan: - Bridge navigation gap: Task Board execute buttons now navigate to /execution - Add Blocker, BatchResponse, and UIAgentState types - Extend API client with blockersApi, batchesApi, and execution controls - Create event styling utilities with agent state derivation - Build 6 event display components (planning, file change, shell, verification, blocker, base item) - Build 4 container components (header, progress, event stream, changes sidebar) - Add 3 Shadcn UI primitives (alert-dialog, progress, scroll-area) - Create useExecutionMonitor hook with rAF-batched state accumulation - Create single-task execution page at /execution/[taskId] - Create execution landing page with batch/task routing - Create BatchExecutionMonitor with accordion task rows - Enable Execution nav link in sidebar - Add barrel export for all execution components
- useExecutionMonitor hook: 12 tests covering state derivation, event accumulation, progress tracking, completion detection, heartbeat filtering, and taskId reset - EventItem component: 12 tests covering delegation to specialized components for each event type (planning, file change, shell, verification, blocker, completion, error) - BlockerEvent component: 7 tests covering form rendering, submit behavior, API call, error display, and whitespace trimming - eventStyles utilities: 14 tests covering deriveAgentState for all event types and completeness of style/icon/label maps - Update AppSidebar test to reflect Execution nav now enabled - Add execution monitor icons to Hugeicons mock 252 tests passing, 24 suites.
1. ExecutionHeader: await onStop() so loading spinner is visible 2. EventStream: remove ScrollArea wrapper to fix scroll detection (nested overflow containers broke onScroll + scrollIntoView) 3. BatchExecutionMonitor: use ref instead of state for task ID dedup to prevent polling interval reset on each task load 4. BatchExecutionMonitor: add try/catch to stop/cancel handlers 5. eventStyles: map blocked completion to BLOCKED, not FAILED 6. BlockerEvent: add aria-label to textarea 7. ChangesSidebar: add aria-expanded to collapse button 8. BatchExecutionMonitor: add aria-expanded to accordion rows
WalkthroughAdds a client-side Execution Monitor with SSE streaming, new hooks (useTaskStream opt, useExecutionMonitor), a suite of execution UI components/pages (including blocker answer flow, event stream with auto-scroll, progress indicator, changes sidebar, and batch monitor), backend streaming/task changes, API additions for blockers/batches/stop/resume, Radix UI primitives, icon mock additions, and many tests. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Page as ExecutionPage
participant Hook as useExecutionMonitor
participant SSE as SSE_Stream
participant Stream as EventStream
participant Blocker as BlockerEvent
participant API as Tasks/Blockers_API
User->>Page: open /execution/{taskId}
Page->>Hook: start(taskId, workspacePath)
Hook->>SSE: connect to /api/v2/tasks/{taskId}/stream
loop streaming events
SSE->>Hook: ExecutionEvent (progress/output/blocker/completion/heartbeat)
Hook->>Hook: buffer event & schedule rAF flush
Hook->>Page: batched state update (agentState, steps, message, events)
Page->>Stream: render events list
end
alt Blocker event
Stream->>Blocker: render blocker question
User->>Blocker: submit answer
Blocker->>API: POST /api/v2/blockers/{id}/answer
API-->>Blocker: success
Blocker->>Page: onAnswered callback (resume stream)
end
alt Stop action
User->>Page: click Stop
Page->>API: POST /api/v2/tasks/{id}/stop
API-->>Page: success
Page->>Hook: close()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
Add real-time Execution Monitor views and move structured SSE events to
|
Code Review: Execution Monitor View (#332)OverviewThis is a well-architected and comprehensive implementation of real-time execution monitoring for CodeFRAME. The PR successfully bridges the navigation gap identified in #331 and delivers both single-task and batch execution monitoring with SSE streaming. The code quality is high, with excellent separation of concerns and 53 new tests. ✅ StrengthsArchitecture & Design
Implementation Quality
🔍 Issues & RecommendationsCriticalNone identified. The code review findings from the PR description appear to have been addressed. High Priority
Medium Priority
Low Priority
🔒 Security Considerations
🚀 Performance Considerations
📝 Documentation & TestingStrengths
Suggestions
🎯 SummaryThis is production-ready code with only minor refinements needed. The implementation demonstrates:
Recommended Actions (Priority Order)
Approval Status✅ Approved with minor suggestions. The issues identified are primarily polish items that don't block merging. Great work on this feature! CLAUDE.md Compliance: ✅
|
| }; | ||
|
|
||
| function getStatusConfig(status: string) { | ||
| return statusConfig[status] ?? { icon: Loading03Icon, className: 'text-gray-400', label: status }; |
There was a problem hiding this comment.
🟢 Low
execution/BatchExecutionMonitor.tsx:41 Using bracket notation on a plain object can return inherited properties like constructor. Consider using Object.hasOwn(statusConfig, status) before the lookup to avoid prototype pollution.
| return statusConfig[status] ?? { icon: Loading03Icon, className: 'text-gray-400', label: status }; | |
| return Object.hasOwn(statusConfig, status) ? statusConfig[status] : { icon: Loading03Icon, className: 'text-gray-400', label: status }; |
🚀 Want me to fix this? Reply ex: "fix it for me".
| for (const taskId of data.task_ids) { | ||
| if (!fetchedTaskIdsRef.current.has(taskId)) { | ||
| fetchedTaskIdsRef.current.add(taskId); | ||
| tasksApi.getOne(workspacePath, taskId).then((task) => { | ||
| setTasks((prev) => ({ ...prev, [taskId]: task })); | ||
| }).catch(() => { | ||
| // Task may have been deleted | ||
| }); |
There was a problem hiding this comment.
🟢 Low
execution/BatchExecutionMonitor.tsx:69 Task IDs are added to fetchedTaskIdsRef before the fetch completes. If tasksApi.getOne fails, the ID stays in the set and future polls skip it permanently. Consider adding to the set only on success (inside .then()), or removing from the set in the .catch() block.
| for (const taskId of data.task_ids) { | |
| if (!fetchedTaskIdsRef.current.has(taskId)) { | |
| fetchedTaskIdsRef.current.add(taskId); | |
| tasksApi.getOne(workspacePath, taskId).then((task) => { | |
| setTasks((prev) => ({ ...prev, [taskId]: task })); | |
| }).catch(() => { | |
| // Task may have been deleted | |
| }); | |
| for (const taskId of data.task_ids) { | |
| if (!fetchedTaskIdsRef.current.has(taskId)) { | |
| tasksApi.getOne(workspacePath, taskId).then((task) => { | |
| fetchedTaskIdsRef.current.add(taskId); | |
| setTasks((prev) => ({ ...prev, [taskId]: task })); | |
| }).catch(() => { | |
| // Task may have been deleted or fetch failed - will retry on next poll | |
| }); |
🚀 Want me to fix this? Reply ex: "fix it for me".
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In `@web-ui/src/app/execution/`[taskId]/page.tsx:
- Around line 31-36: The effect around useEffect calling tasksApi.getOne should
not swallow errors silently; update the handler in the useEffect so that on
rejection you set an explicit error or fallback state (e.g., setTask(null) or
setTask({ notFound: true }) and/or set an error boolean like setTaskError)
instead of an empty catch; reference tasksApi.getOne, setTask and the useEffect
callback to add a .catch(err => { setTask(null); setTaskError(err || true); })
(and adjust any loading logic that relies on task being undefined) so the UI can
show an error or "task not found" message instead of "Loading task…".
- Around line 42-45: The handleStop callback calls tasksApi.stopExecution but
doesn't catch errors, causing unhandled promise rejections; update the
handleStop implementation to wrap the await
tasksApi.stopExecution(workspacePath, taskId) call in try/catch (keeping any
existing finally logic), log the error (e.g., console.error or process logger)
and surface user feedback (e.g., set an error state or trigger a toast/snackbar)
so ExecutionHeader.handleStop shows a clear failure message to the user instead
of letting the rejection bubble up.
In `@web-ui/src/app/execution/page.tsx`:
- Around line 64-83: The effect currently sets resolving to false whenever it
early-returns (including when workspacePath is still null), causing a flash of
the empty state; change the early-return logic in the useEffect so you first
bail out silently if workspacePath is falsy (just return) and only call
setResolving(false) when there are explicit params (batchId || taskIdParam), and
keep the existing tasksApi.getAll(...) flow for when workspacePath is present so
resolving is cleared only after a successful redirect or on fetch error; update
the useEffect that references workspacePath, batchId, taskIdParam, setResolving,
tasksApi.getAll and router.replace accordingly.
In `@web-ui/src/components/execution/BatchExecutionMonitor.tsx`:
- Around line 89-98: In BatchExecutionMonitor's useEffect that sets/clears
pollRef using fetchBatch, the active-check compares batch.status against
lowercase strings; update that check to use the backend's UPPERCASE enum values
(e.g., replace ['completed','failed','cancelled'] with
['COMPLETED','FAILED','CANCELLED']) so the interval stops when batch.status
becomes a terminal state; ensure any other status comparisons in this component
(e.g., the later checks that reference 'COMPLETED'/'DONE') remain consistent
with uppercase enums.
In `@web-ui/src/components/execution/ShellCommandEvent.tsx`:
- Around line 28-35: The toggle Button in the ShellCommandEvent component
doesn't expose its state to assistive tech; add an aria-expanded attribute bound
to the existing expanded state (e.g., aria-expanded={expanded}) on the Button
used with onClick={() => setExpanded(!expanded)} so screen readers know whether
the collapsible region is open; ensure the attribute updates when expanded
changes and optionally add aria-controls referencing the collapsible region's id
if present.
In `@web-ui/src/components/execution/VerificationEvent.tsx`:
- Line 19: The current pass detection uses a too-broad regex in the
VerificationEvent component (const passed = /pass/i.test(event.message ?? ''))
and can match words like "bypass" or "password"; change the regex to use word
boundaries such as /\bpass\b/i (or /^\s*pass\b/i if you want to anchor to the
start) so it only matches the standalone word "pass", keeping the
null-coalescing on event.message; update the const passed assignment to use this
new regex and keep the rest of the component unchanged.
In `@web-ui/src/hooks/useExecutionMonitor.ts`:
- Around line 88-118: The rAF flush in scheduleFlush can overwrite a recent
DISCONNECTED state because it derives agentState only from events; update
scheduleFlush (and the similar flush at the other location) to consult the
current SSE status when computing agentState: capture the latest sseStatus
(e.g., via a sseStatusRef or by adding sseStatus to the closure) and if it
indicates disconnected/closed set agentState = 'DISCONNECTED' (or preserve
DISCONNECTED) before calling deriveAgentState(findLastNonHeartbeat(...)); ensure
rafRef logic and setState merge still run but that agentState is gated by the
live SSE status so a pending rAF cannot revert a DISCONNECTED state.
🧹 Nitpick comments (16)
web-ui/__tests__/components/layout/AppSidebar.test.tsx (1)
80-88: Test name is now slightly misleading.The test is titled "renders disabled items as non-link spans" but now also asserts that
TasksandExecutionare links. Consider renaming to something like"renders enabled items as links and disabled items as spans"for clarity.web-ui/src/components/execution/ShellCommandEvent.tsx (1)
3-4: Unused imports:CheckmarkCircle01IconandCancel01Icon.These two icons are imported but never referenced in the component's JSX. They can be removed to keep imports clean.
🧹 Proposed fix
-import { CommandLineIcon, CheckmarkCircle01Icon, Cancel01Icon } from '@hugeicons/react'; +import { CommandLineIcon } from '@hugeicons/react';web-ui/src/components/execution/FileChangeEvent.tsx (2)
22-23: Duplicated regex pattern withEventItem.isFileChangeEvent.The prefix-stripping regex here (
/^(Creating|Editing|Deleting) file:\s*/i) is semantically coupled with the detection regex inEventItem.tsxline 45 (/^(creating|editing|deleting) file:/i). If the backend message format changes, both must be updated in tandem. Consider extracting a shared constant or utility (e.g.,FILE_CHANGE_PREFIX_RE) to keep them in sync.
39-44: Placeholder diff content noted — ensure follow-up.The expanded section currently re-displays
event.messageas placeholder. The comment at lines 41-42 explains diff content will come from subsequentOutputEvents. This is fine for the initial implementation, but the "View Diff" label may mislead users since no actual diff is shown yet.web-ui/src/components/execution/EventItem.tsx (1)
87-115: Consider guardingpe.stepin the generic step branch.At line 109,
pe.stepis rendered inside thepe.total_steps > 0guard, butstepitself could be0or potentiallyundefineddepending on the backend. Ifstepisundefined, the UI would show "Step undefined/3". This is a minor edge case, but a simple guard or default would make it more robust.Optional fix
{pe.total_steps > 0 && ( <span className="mr-1.5 text-xs text-muted-foreground"> - Step {pe.step}/{pe.total_steps} + Step {pe.step ?? '?'}/{pe.total_steps} </span> )}web-ui/src/components/execution/ChangesSidebar.tsx (1)
26-29: Consider addingroleandaria-labelto the sidebar container.Screen readers won't identify this region's purpose. A quick
role="complementary"andaria-label="Changed files"on the outer<div>would improve discoverability.♿ Proposed fix
<div + role="complementary" + aria-label="Changed files" className={`shrink-0 rounded-lg border bg-card transition-all ${ collapsed ? 'w-10' : 'w-64' }`} >web-ui/src/components/execution/BatchExecutionMonitor.tsx (2)
100-107: Auto-expand effect re-runs on every poll due tobatchobject reference.
batchis a new object on everysetBatch(data)call, so this effect fires every 5 seconds. It short-circuits via theexpandedTaskIdguard, so it's not broken, but you could tighten the dependency to avoid unnecessary runs.♻️ Narrow the dependency
useEffect(() => { if (!batch || expandedTaskId) return; const inProgress = batch.task_ids.find( (id) => batch.results[id] === 'IN_PROGRESS' ); if (inProgress) setExpandedTaskId(inProgress); - }, [batch, expandedTaskId]); + }, [batch?.status, batch?.task_ids, expandedTaskId]);This still re-evaluates when the batch updates meaningfully but avoids re-running on every identical poll response. Alternatively, you could JSON-stringify the results map as a dependency key.
85-87: Theeslint-disablefor exhaustive-deps is unnecessary.
fetchBatchis auseCallbackwith[workspacePath, batchId]as deps — the same values listed in this effect's dependency array. AddingfetchBatchto the deps array would be correct and eliminate the need for the suppression comment.♻️ Proposed fix
useEffect(() => { fetchBatch(); - }, [batchId, workspacePath]); // eslint-disable-line react-hooks/exhaustive-deps + }, [fetchBatch]);web-ui/__tests__/components/execution/EventItem.test.tsx (1)
174-203: Consider adding a test for completion with nofiles_modifiedfield.The success completion test assumes
files_modifiedis present. A quick test withfiles_modifiedomitted (or empty[]) would cover the fallback rendering path and guard against NPE regressions.web-ui/src/types/index.ts (1)
125-136:BatchResponsefields should use union types to match backend enums for stronger type safety.
strategyis typed asstringbutBatchStrategyis already defined on Line 77 as'serial' | 'parallel' | 'auto'. The backend enums (fromcodeframe/core/conductor.py) show thatstatushas six values:'PENDING' | 'RUNNING' | 'COMPLETED' | 'PARTIAL' | 'FAILED' | 'CANCELLED', andon_failurehas two values:'continue' | 'stop'. Replacing thestringtypes with these union types will catch mismatches at compile time rather than runtime.♻️ Proposed tightening
+export type BatchStatus = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'PARTIAL' | 'FAILED' | 'CANCELLED'; +export type BatchOnFailure = 'continue' | 'stop'; + export interface BatchResponse { id: string; workspace_id: string; task_ids: string[]; - status: string; - strategy: string; + status: BatchStatus; + strategy: BatchStrategy; max_parallel: number; - on_failure: string; + on_failure: BatchOnFailure; started_at: string | null; completed_at: string | null; results: Record<string, string>; // task_id → RunStatus }web-ui/src/components/execution/ExecutionHeader.tsx (1)
85-93: Connection status dot is not accessible to screen readers.The
<span>withtitleis not focusable andtitleis not reliably announced by assistive technology. The visible text label at Line 92 mitigates this for sighted users, but consider addingrole="status"and anaria-labelon the containing<div>so screen readers announce connection changes.Suggested improvement
- <div className="flex items-center gap-1.5"> + <div className="flex items-center gap-1.5" role="status" aria-label={`Connection: ${sseStatus}`}> <span className={cn( 'inline-block h-2 w-2 rounded-full', connectionDotStyles[sseStatus] ?? connectionDotStyles.idle )} - title={`Connection: ${sseStatus}`} + aria-hidden="true" />web-ui/src/app/execution/[taskId]/page.tsx (1)
123-195:CompletionBanneruses raw<button>elements instead of the project'sButtoncomponent.The rest of the execution UI consistently uses the shared
Buttonprimitive. Using raw<button>here loses theming consistency (focus rings, variants, etc.) and duplicates styling. Consider switching toButtonwith appropriate variants.web-ui/src/hooks/useExecutionMonitor.ts (1)
130-136: Unbounded event accumulation —eventsRefgrows without limit.Each new event creates a new array via spread (
[...eventsRef.current, event]). For long-running tasks with thousands of events this causes increasing memory and GC pressure. Consider capping with a sliding window or switching topush()to avoid copying the entire array on each event.web-ui/src/lib/eventStyles.ts (3)
73-83:EXECUTINGandCOMPLETEDshare identical badge styles.Both map to
bg-green-100 text-green-800. While labels and icons differentiate them, the identical color may reduce at-a-glance scannability in a busy event stream. Consider a distinct shade (e.g.,bg-emerald-100 text-emerald-800) forCOMPLETEDif you want terminal states to stand out visually.
36-68: Remove type assertions in switch cases—use discriminated union narrowing instead.Lines 39 and 56 use
ascasts (as ProgressEvent,as CompletionEvent). SinceExecutionEventis a discriminated union with a literalevent_typeon each variant, TypeScript automatically narrows the type inside eachcasebranch. The casts can be safely removed:case 'progress': { const { phase } = event; // event is already ProgressEvent here // ... } case 'completion': { const { status } = event; // event is already CompletionEvent here // ... }This improves clarity and ensures type safety is enforced by the type system rather than bypassed by assertions.
129-135: Tighten the key type forconnectionDotStyles.
Record<string, string>allows any key, losing type safety. Narrow the key union to match the actual SSE status values ('idle' | 'connecting' | 'open' | 'closed' | 'error') — this will catch typos and improve autocomplete. Note:SSEStatusis already exported fromhooks/useEventSource.tsand could be reused instead of defining a local type.♻️ Suggested change
-export const connectionDotStyles: Record<string, string> = { +import type { SSEStatus } from '@/hooks/useEventSource'; + +export const connectionDotStyles: Record<SSEStatus, string> = { idle: 'bg-gray-400', connecting: 'bg-yellow-400 animate-pulse', open: 'bg-green-500', closed: 'bg-gray-400', error: 'bg-red-500', };
| useEffect(() => { | ||
| if (batchId || taskIdParam || !workspacePath) { | ||
| setResolving(false); | ||
| return; | ||
| } | ||
|
|
||
| tasksApi | ||
| .getAll(workspacePath, 'IN_PROGRESS') | ||
| .then((response) => { | ||
| const tasks = response.tasks ?? []; | ||
| if (tasks.length > 0) { | ||
| router.replace(`/execution/${tasks[0].id}`); | ||
| } else { | ||
| setResolving(false); | ||
| } | ||
| }) | ||
| .catch(() => { | ||
| setResolving(false); | ||
| }); | ||
| }, [workspacePath, batchId, taskIdParam, router]); |
There was a problem hiding this comment.
Brief flash of "No active execution" when workspace hydrates without batch/task params.
On first render, workspacePath is null, so the effect hits the early return at Line 65 and unconditionally calls setResolving(false). When workspacePath hydrates and triggers a re-run, resolving is already false, so the component falls through to the empty state at Line 131 until the fetch completes and triggers a redirect.
Split the early-return logic so that resolving is only set to false when it's truly resolved (i.e. batch/task param is present), not when workspace hasn't loaded yet.
Proposed fix
useEffect(() => {
- if (batchId || taskIdParam || !workspacePath) {
+ if (batchId || taskIdParam) {
+ setResolving(false);
+ return;
+ }
+ if (!workspacePath) {
+ // Still waiting for workspace hydration — keep resolving=true
setResolving(false);
return;
}
tasksApi
.getAll(workspacePath, 'IN_PROGRESS')
.then((response) => {Wait — if we simply return without setResolving(false) when !workspacePath, the initial resolving=true is preserved, so the spinner keeps showing until workspace hydrates and the fetch completes. Here's the corrected diff:
useEffect(() => {
- if (batchId || taskIdParam || !workspacePath) {
- setResolving(false);
- return;
- }
+ if (batchId || taskIdParam) {
+ setResolving(false);
+ return;
+ }
+ if (!workspacePath) return; // keep resolving=true until workspace is ready
tasksApi
.getAll(workspacePath, 'IN_PROGRESS')📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| useEffect(() => { | |
| if (batchId || taskIdParam || !workspacePath) { | |
| setResolving(false); | |
| return; | |
| } | |
| tasksApi | |
| .getAll(workspacePath, 'IN_PROGRESS') | |
| .then((response) => { | |
| const tasks = response.tasks ?? []; | |
| if (tasks.length > 0) { | |
| router.replace(`/execution/${tasks[0].id}`); | |
| } else { | |
| setResolving(false); | |
| } | |
| }) | |
| .catch(() => { | |
| setResolving(false); | |
| }); | |
| }, [workspacePath, batchId, taskIdParam, router]); | |
| useEffect(() => { | |
| if (batchId || taskIdParam) { | |
| setResolving(false); | |
| return; | |
| } | |
| if (!workspacePath) return; // keep resolving=true until workspace is ready | |
| tasksApi | |
| .getAll(workspacePath, 'IN_PROGRESS') | |
| .then((response) => { | |
| const tasks = response.tasks ?? []; | |
| if (tasks.length > 0) { | |
| router.replace(`/execution/${tasks[0].id}`); | |
| } else { | |
| setResolving(false); | |
| } | |
| }) | |
| .catch(() => { | |
| setResolving(false); | |
| }); | |
| }, [workspacePath, batchId, taskIdParam, router]); |
🤖 Prompt for AI Agents
In `@web-ui/src/app/execution/page.tsx` around lines 64 - 83, The effect currently
sets resolving to false whenever it early-returns (including when workspacePath
is still null), causing a flash of the empty state; change the early-return
logic in the useEffect so you first bail out silently if workspacePath is falsy
(just return) and only call setResolving(false) when there are explicit params
(batchId || taskIdParam), and keep the existing tasksApi.getAll(...) flow for
when workspacePath is present so resolving is cleared only after a successful
redirect or on fetch error; update the useEffect that references workspacePath,
batchId, taskIdParam, setResolving, tasksApi.getAll and router.replace
accordingly.
Resolve 17 issues from multi-angle PR review (3 critical, 7 high, 7 medium): Critical: - Remove duplicate SSE stream route from streaming_v2.py (route collision with tasks_v2) - Add workspace_path to SSE EventSource URL in useTaskStream - Add aria-live="polite" and role="log" to EventStream for screen readers High: - Fix batch status case mismatch (lowercase → UPPERCASE) in BatchExecutionMonitor - Keep EventStream mounted when task is BLOCKED (enables inline blocker answers) - Add role="alert" to CompletionBanner variants - Add aria-expanded to FileChangeEvent and ShellCommandEvent toggle buttons - Add responsive flex-col/md:flex-row layout for mobile - Add dark mode variants to badge and connection dot styles - Add task-not-found error state instead of infinite loading Medium: - Add try/catch to single-task handleStop - Fix overly broad /pass/i regex → /\bpassed?\b/i in VerificationEvent - Prevent rAF flush from overwriting DISCONNECTED agent state - Pass value prop to ProgressPrimitive.Root for aria-valuenow - Clear error state on successful batch fetch - Add encodeURIComponent to new API path interpolations - Add SSE disconnection banner with reconnect button
Code Review: Execution Monitor ViewThis is a comprehensive implementation of real-time task execution monitoring with SSE streaming. The architecture is well-designed with clear separation of concerns. Here's my detailed feedback: ✅ StrengthsArchitecture & Design
Implementation Quality
🔍 Issues to Address (from CodeRabbit review)I see CodeRabbit identified 7 actionable issues that should be addressed: 1. Error Handling in Task Fetch (
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
codeframe/ui/routers/streaming_v2.py (2)
84-127:⚠️ Potential issue | 🟠 Major
heartbeat_intervalparameter is accepted but never used.The
event_stream_generatoracceptsheartbeat_interval(default 30s) but the function body never sends heartbeat/keep-alive messages. SSE connections will time out on proxies and load balancers that enforce idle timeouts (commonly 60s). This is likely an incomplete implementation.A typical pattern is to race
publisher.subscribeagainst anasyncio.sleep(heartbeat_interval)and yield a comment line on timeout:💡 Sketch of heartbeat support
async def event_stream_generator( task_id: str, publisher: EventPublisher, request: Request, heartbeat_interval: float = 30.0, ) -> AsyncGenerator[str, None]: logger.info(f"Starting SSE stream for task {task_id}") + subscription = publisher.subscribe(task_id).__aiter__() try: - async for event in publisher.subscribe(task_id): - # Check if client disconnected + while True: if await request.is_disconnected(): logger.info(f"Client disconnected from task {task_id} stream") break - yield format_sse_event(event) - - # If this is a completion event, we're done - if event.event_type == "completion": - logger.info(f"Task {task_id} completed, closing stream") - break + try: + event = await asyncio.wait_for( + subscription.__anext__(), timeout=heartbeat_interval + ) + yield format_sse_event(event) + if event.event_type == "completion": + logger.info(f"Task {task_id} completed, closing stream") + break + except asyncio.TimeoutError: + yield format_sse_comment("heartbeat") + except StopAsyncIteration: + break except asyncio.CancelledError: logger.info(f"SSE stream cancelled for task {task_id}") raise
22-25:⚠️ Potential issue | 🟡 MinorRemove the empty router definition; this module is utilities-only.
The
routerat lines 22–25 is instantiated but has zero routes and adds unnecessary overhead to the app's route matching. Since the actual SSE endpoint is intasks_v2.py(line 726), remove the empty router definition and its mount inserver.py(line 477).Additionally, the
heartbeat_intervalparameter at line 88 is accepted byevent_stream_generatorbut never used in the function body—either implement heartbeat support or remove the unused parameter.tests/ui/test_streaming_router.py (2)
11-11:⚠️ Potential issue | 🔴 CriticalCI failure: unused
TestClientimport.The pipeline reports
F401 'fastapi.testclient.TestClient' imported but unused. This is breaking CI. Remove the import.🔧 Fix
-from fastapi.testclient import TestClient
21-69:⚠️ Potential issue | 🟡 MinorRemove unused fixtures and unnecessary imports.
The three fixtures
mock_user,mock_workspace, andapp_with_streamingare unused; no test methods reference them. Removing them eliminates unnecessary imports includingcodeframe.auth.User,codeframe.core.workspace.Workspace,codeframe.core.tasks, andunittest.mock.MagicMock. Additionally,TestClientimported on line 11 is not used anywhere in the file.
🤖 Fix all issues with AI agents
In `@tests/ui/test_streaming_router.py`:
- Around line 162-175: Update the stale module docstring to describe the current
scope (SSE utilities and publisher tests, noting the SSE endpoint was moved to
tasks_v2) and add the v2 pytest marker; specifically, edit the module docstring
at the top of tests/ui/test_streaming_router.py to reflect the move and current
purpose, and add a module-level pytestmark = pytest.mark.v2 (or decorate
TestStreamingRouterEndpoint with `@pytest.mark.v2`) so the tests are marked as v2;
reference the TestStreamingRouterEndpoint class and the router import to ensure
context remains accurate.
In `@web-ui/src/hooks/useTaskStream.ts`:
- Around line 112-115: The SSE URL construction in useTaskStream.ts interpolates
taskId raw into the path; update the url variable to URI-encode taskId (use
encodeURIComponent(taskId)) when building
`${apiBase}/api/v2/tasks/${taskId}/stream...` so the path segment is safe,
keeping workspacePath encoded as it is and preserving the ternary that sets url
to null when taskId or workspacePath are falsy.
🧹 Nitpick comments (8)
web-ui/src/components/execution/ChangesSidebar.tsx (1)
50-66: Consider semantic list markup for accessibility.The file list uses plain
divelements. Given the PR's emphasis on accessibility (aria-live,roleattributes elsewhere), using<ul>/<li>would let screen readers announce item count and improve navigation. Theh-[calc(100%-37px)]is also a fragile magic number — if the header padding changes, this breaks silently.♻️ Suggested semantic list markup
<ScrollArea className="h-[calc(100%-37px)]"> - <div className="p-2"> + <ul className="list-none p-2" role="list" aria-label="Changed files"> {changedFiles.map((filePath) => ( - <div + <li key={filePath} className="flex items-center gap-1.5 rounded px-2 py-1 text-xs hover:bg-muted/50" title={filePath} > <FileEditIcon className="h-3 w-3 shrink-0 text-muted-foreground" /> <span className="truncate font-mono">{filePath}</span> - </div> + </li> ))} - </div> + </ul> </ScrollArea>web-ui/src/components/execution/FileChangeEvent.tsx (2)
23-29: Icon color doesn't reflect the operation type.The icon is always green, but for "Deleting file" events a red/destructive color would be more intuitive. Consider deriving the color from the matched operation:
💡 Suggested approach
- const filePath = event.message?.replace(/^(Creating|Editing|Deleting) file:\s*/i, '') ?? ''; + const match = event.message?.match(/^(Creating|Editing|Deleting) file:\s*(.*)/i); + const operation = match?.[1]?.toLowerCase() ?? ''; + const filePath = match?.[2] ?? event.message ?? ''; + + const iconColor = operation === 'deleting' + ? 'text-red-500' + : operation === 'creating' + ? 'text-green-600' + : 'text-blue-500';Then use
iconColoron theFileEditIcon:- <FileEditIcon className="h-4 w-4 shrink-0 text-green-600" /> + <FileEditIcon className={`h-4 w-4 shrink-0 ${iconColor}`} />
30-45: "View Diff" label sets an expectation the placeholder content doesn't fulfill.The comment on Lines 42–43 acknowledges this is temporary. When actual diff data becomes available, consider also adding an
aria-controlsattribute on the button pointing to anidon the<pre>element for better screen-reader association.For now, a small label tweak could reduce user confusion until real diffs land:
💡 Interim label suggestion
- {expanded ? 'Hide' : 'View Diff'} + {expanded ? 'Hide' : 'Details'}web-ui/src/lib/api.ts (1)
204-221: InconsistenttaskIdencoding between existing and new task methods.
stopExecutionandresumeExecutionuseencodeURIComponent(taskId)(good), but the pre-existingstartExecution(Line 183),getOne(Line 153), andupdateStatus(Line 168) interpolatetaskIddirectly without encoding. While task IDs are likely safe strings today, the inconsistency could mask a bug if IDs ever contain special characters.Consider encoding uniformly across all
tasksApimethods for consistency:♻️ Example for startExecution
startExecution: async ( workspacePath: string, taskId: string ): Promise<TaskStartResponse> => { const response = await api.post<TaskStartResponse>( - `/api/v2/tasks/${taskId}/start`, + `/api/v2/tasks/${encodeURIComponent(taskId)}/start`, {}, { params: { workspace_path: workspacePath, execute: true } } );web-ui/src/hooks/useTaskStream.ts (1)
117-147: Callback stability note:handleMessagedepends on all six event callbacks.If consumers pass inline (non-memoized) callbacks,
handleMessagewill get a new identity every render. Whether this triggers SSE reconnections depends onuseEventSource's internal dependency ononMessage. This is pre-existing behavior, but worth being aware of—callers likeuseExecutionMonitorshould ensure callbacks are stable (e.g., viauseCallback).#!/bin/bash # Check how useEventSource depends on onMessage — does it reconnect when onMessage changes? ast-grep --pattern $'export function useEventSource($$$) { $$$ }'web-ui/__tests__/hooks/useTaskStream.test.ts (2)
9-14: Mock doesn't capture the URL, so the newworkspace_pathquery-param logic is untested.The
useEventSourcemock ignores theurlargument entirely, so no test verifies that:
- The constructed URL includes
?workspace_path=...when both args are provided.- The URL is
nullwhentaskIdis set butworkspacePathisnull.Consider capturing the URL to assert on it:
💡 Suggested enhancement
const mockClose = jest.fn(); let capturedOnMessage: ((data: string) => void) | undefined; +let capturedUrl: string | null | undefined; jest.mock('@/hooks/useEventSource', () => ({ - useEventSource: ({ onMessage }: { url: string | null; onMessage?: (data: string) => void }) => { + useEventSource: ({ url, onMessage }: { url: string | null; onMessage?: (data: string) => void }) => { + capturedUrl = url; capturedOnMessage = onMessage; return { status: 'open' as const, close: mockClose }; }, }));Then add assertions like:
it('passes null URL when workspacePath is null', () => { renderHook(() => useTaskStream({ taskId: 'task-1', workspacePath: null }) ); expect(capturedUrl).toBeNull(); }); it('includes workspace_path in URL', () => { renderHook(() => useTaskStream({ taskId: 'task-1', workspacePath: '/tmp/ws' }) ); expect(capturedUrl).toContain('workspace_path=%2Ftmp%2Fws'); });
22-27: Test coverstaskId: nullbut not theworkspacePath: null(with validtaskId) edge case.The hook should produce a
nullURL when onlyworkspacePathisnull. Adding a test for{ taskId: 'task-1', workspacePath: null }would strengthen coverage of the guard at Line 113 in the source.codeframe/ui/routers/streaming_v2.py (1)
14-15:APIRouterimport is only used for the empty router;StreamingResponserelies on downstream re-import.
APIRouteris only used to create the empty router on line 22. If the router is removed (per the earlier suggestion), this import becomes dead code. TheStreamingResponsere-export (with# noqa: F401) is fine as long as consumers actually import it from here.
| class TestStreamingRouterEndpoint: | ||
| """Tests for streaming router endpoint configuration.""" | ||
| """Tests for streaming router configuration. | ||
|
|
||
| def test_endpoint_exists(self, app_with_streaming): | ||
| """The streaming endpoint should be registered.""" | ||
| client = TestClient(app_with_streaming) | ||
| NOTE: The SSE stream endpoint (GET /api/v2/tasks/{task_id}/stream) was | ||
| moved to tasks_v2.py to avoid a route collision with the JWT-auth version. | ||
| The tasks_v2 version only requires workspace_path, making it compatible | ||
| with browser EventSource which cannot send custom auth headers. | ||
| """ | ||
|
|
||
| # Get the OpenAPI schema to verify endpoint exists | ||
| response = client.get("/openapi.json") | ||
| assert response.status_code == 200 | ||
| def test_streaming_router_has_no_endpoints(self): | ||
| """streaming_v2 router should have no endpoints (utilities only).""" | ||
| from codeframe.ui.routers.streaming_v2 import router | ||
|
|
||
| schema = response.json() | ||
| paths = schema.get("paths", {}) | ||
|
|
||
| # Verify the stream endpoint is registered | ||
| assert "/api/v2/tasks/{task_id}/stream" in paths | ||
| assert "get" in paths["/api/v2/tasks/{task_id}/stream"] | ||
|
|
||
| def test_endpoint_returns_streaming_response(self, app_with_streaming): | ||
| """The endpoint should return a streaming response with SSE content type.""" | ||
| from codeframe.core.streaming import EventPublisher | ||
| from codeframe.ui.routers import streaming_v2 | ||
|
|
||
| # Inject a publisher that immediately completes the task | ||
| publisher = EventPublisher() | ||
| streaming_v2.set_event_publisher(publisher) | ||
|
|
||
| try: | ||
| client = TestClient(app_with_streaming) | ||
|
|
||
| # Use stream=True but set a short timeout via the client | ||
| # and complete the task immediately | ||
| import threading | ||
| import time | ||
|
|
||
| def complete_task(): | ||
| time.sleep(0.1) | ||
| import asyncio | ||
| loop = asyncio.new_event_loop() | ||
| loop.run_until_complete(publisher.complete_task("test-task")) | ||
| loop.close() | ||
|
|
||
| thread = threading.Thread(target=complete_task) | ||
| thread.start() | ||
|
|
||
| # The TestClient doesn't easily support streaming, | ||
| # so we just verify the endpoint starts without error | ||
| # Real streaming tests require async client | ||
| with client.stream("GET", "/api/v2/tasks/test-task/stream") as response: | ||
| assert response.status_code == 200 | ||
| assert "text/event-stream" in response.headers.get("content-type", "") | ||
| # Read at most a small amount before breaking | ||
| break_after = 0 | ||
| for _ in response.iter_lines(): | ||
| break_after += 1 | ||
| if break_after > 0: | ||
| break | ||
|
|
||
| thread.join(timeout=2.0) | ||
| finally: | ||
| streaming_v2.set_event_publisher(None) | ||
| assert len(router.routes) == 0 |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
The endpoint-absence test is useful but the class docstring and module docstring are now stale.
The module docstring (lines 1-5) still says "Tests for… the /api/v2/tasks/{task_id}/stream SSE endpoint", and the class docstring mentions the move but the module-level doc doesn't reflect it. Consider updating the module docstring to match the current scope (SSE utilities + publisher tests).
Also, per coding guidelines for tests/**/*.py: tests should be marked with @pytest.mark.v2 or a module-level pytestmark = pytest.mark.v2. This file has no such marker.
💡 Suggested additions
"""Tests for SSE streaming router.
-TDD: Tests written first to define expected behavior of
-the /api/v2/tasks/{task_id}/stream SSE endpoint.
+Tests for shared SSE utilities (formatting, publisher management)
+and verification that the streaming_v2 router has no endpoints.
"""
+
+import pytest
+
+pytestmark = pytest.mark.v2As per coding guidelines, tests/**/*.py: "Mark v2 tests with @pytest.mark.v2 decorator or pytestmark = pytest.mark.v2 module-level declaration."
🤖 Prompt for AI Agents
In `@tests/ui/test_streaming_router.py` around lines 162 - 175, Update the stale
module docstring to describe the current scope (SSE utilities and publisher
tests, noting the SSE endpoint was moved to tasks_v2) and add the v2 pytest
marker; specifically, edit the module docstring at the top of
tests/ui/test_streaming_router.py to reflect the move and current purpose, and
add a module-level pytestmark = pytest.mark.v2 (or decorate
TestStreamingRouterEndpoint with `@pytest.mark.v2`) so the tests are marked as v2;
reference the TestStreamingRouterEndpoint class and the router import to ensure
context remains accurate.
| const url = | ||
| taskId && workspacePath | ||
| ? `${apiBase}/api/v2/tasks/${taskId}/stream?workspace_path=${encodeURIComponent(workspacePath)}` | ||
| : null; |
There was a problem hiding this comment.
taskId is not URI-encoded in the SSE URL path segment.
workspacePath is correctly encoded as a query parameter, but taskId is interpolated raw into the path. In api.ts, the new stopExecution/resumeExecution methods encode taskId via encodeURIComponent. The same should be done here to prevent broken URLs or path traversal if a task ID ever contains special characters (e.g., /, #, %).
🔧 Proposed fix
const url =
taskId && workspacePath
- ? `${apiBase}/api/v2/tasks/${taskId}/stream?workspace_path=${encodeURIComponent(workspacePath)}`
+ ? `${apiBase}/api/v2/tasks/${encodeURIComponent(taskId)}/stream?workspace_path=${encodeURIComponent(workspacePath)}`
: null;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const url = | |
| taskId && workspacePath | |
| ? `${apiBase}/api/v2/tasks/${taskId}/stream?workspace_path=${encodeURIComponent(workspacePath)}` | |
| : null; | |
| const url = | |
| taskId && workspacePath | |
| ? `${apiBase}/api/v2/tasks/${encodeURIComponent(taskId)}/stream?workspace_path=${encodeURIComponent(workspacePath)}` | |
| : null; |
🤖 Prompt for AI Agents
In `@web-ui/src/hooks/useTaskStream.ts` around lines 112 - 115, The SSE URL
construction in useTaskStream.ts interpolates taskId raw into the path; update
the url variable to URI-encode taskId (use encodeURIComponent(taskId)) when
building `${apiBase}/api/v2/tasks/${taskId}/stream...` so the path segment is
safe, keeping workspacePath encoded as it is and preserving the ternary that
sets url to null when taskId or workspacePath are falsy.
| : 'CONNECTING'; | ||
|
|
||
| // Don't let rAF flush overwrite DISCONNECTED state | ||
| if (sseStatusRef.current === 'error' || sseStatusRef.current === 'closed') { |
There was a problem hiding this comment.
🟢 Low
hooks/useExecutionMonitor.ts:104 The DISCONNECTED override in scheduleFlush doesn't check if the task completed. When SSE closes after a CompletionEvent, this overwrites the correct COMPLETED state with DISCONNECTED. Consider checking for completion before overriding (e.g., if (!completion && (sseStatusRef.current === 'error' || ...))).
| if (sseStatusRef.current === 'error' || sseStatusRef.current === 'closed') { | |
| if (!completion && (sseStatusRef.current === 'error' || sseStatusRef.current === 'closed')) { |
🚀 Want me to fix this? Reply ex: "fix it for me".
| // ── Event callbacks ─────────────────────────────────────────────── | ||
| const onEvent = useCallback( | ||
| (event: ExecutionEvent) => { | ||
| eventsRef.current = [...eventsRef.current, event]; |
There was a problem hiding this comment.
🟢 Low
hooks/useExecutionMonitor.ts:139 Spreading the entire array on every event causes O(N²) copying. Consider using push() instead for O(1) appends.
| eventsRef.current = [...eventsRef.current, event]; | |
| eventsRef.current.push(event); |
🚀 Want me to fix this? Reply ex: "fix it for me".
| }, [batch?.status, fetchBatch]); | ||
|
|
||
| // Auto-expand the first IN_PROGRESS task | ||
| useEffect(() => { |
There was a problem hiding this comment.
🟢 Low
execution/BatchExecutionMonitor.tsx:102 The auto-expand effect re-runs when expandedTaskId becomes null (after user collapse), immediately re-expanding the IN_PROGRESS task. Consider using a ref to track if auto-expansion has already occurred, so user actions aren't overridden.
🚀 Want me to fix this? Reply ex: "fix it for me".
Code Review: Execution Monitor View (#332)OverviewThis is a well-architected implementation of real-time execution monitoring with SSE streaming. The PR successfully delivers both single-task and batch execution monitoring with comprehensive test coverage (53 new tests). The code demonstrates strong engineering practices with clear separation of concerns and thoughtful performance optimizations. ✅ StrengthsArchitecture & Performance
Testing
UX
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@web-ui/src/components/execution/BatchExecutionMonitor.tsx`:
- Around line 63-82: The polling logic currently clears and sets the global
error state on every background poll, letting a transient failure replace the
whole batch UI; change fetchBatch to accept an isInitial boolean (or use an
isInitialFetchRef) and only mutate error state for the initial load: remove the
unconditional setError(null) at the top of fetchBatch, call fetchBatch(true) on
mount and fetchBatch(false) for periodic polls, and inside fetchBatch only call
setError('Failed to load batch details') when isInitial is true (and clear error
only on a successful initial fetch). Keep the task-fetching logic
(fetchedTaskIdsRef, tasksApi.getOne, setTasks) unchanged.
- Around line 102-108: The effect auto-expanding IN_PROGRESS tasks overrides a
user's manual collapse because expandedTaskId is in the dependency array; add a
ref (e.g., userInteractedRef) to track manual toggles, set
userInteractedRef.current = true inside the task toggle handler (the function
that calls setExpandedTaskId), then update the useEffect that auto-expands (the
useEffect referencing batch, expandedTaskId and calling setExpandedTaskId) to
skip auto-expand when userInteractedRef.current is true; also reset
userInteractedRef.current = false when a new batch arrives (e.g., when batch.id
changes) so auto-expand can run for new batches.
🧹 Nitpick comments (2)
web-ui/src/components/execution/BatchExecutionMonitor.tsx (2)
269-308: Events disappear when a task completes while expanded.When
statustransitions fromIN_PROGRESStoCOMPLETED,shouldStreambecomesfalseand theEventStreamis replaced with the static "Task completed successfully." message. The user loses visibility of all accumulated events mid-view.Consider keeping the
EventStreamvisible (read-only) for completed tasks when the user already has the row expanded, e.g.:- const shouldStream = isExpanded && (status === 'IN_PROGRESS' || status === 'BLOCKED'); + const shouldStream = status === 'IN_PROGRESS' || status === 'BLOCKED'; + const showEvents = isExpanded && (shouldStream || monitor.events.length > 0);Then render
EventStreamwhenevershowEventsis true, so historical events remain visible after completion.
86-88: Consider addingfetchBatchto the dependency array instead of disabling the lint rule.Since
fetchBatchis auseCallbackwith[workspacePath, batchId]as deps, adding it to the effect's dependency array is semantically equivalent and avoids theeslint-disablecomment.useEffect(() => { fetchBatch(); - }, [batchId, workspacePath]); // eslint-disable-line react-hooks/exhaustive-deps + }, [fetchBatch]);
| const fetchBatch = useCallback(async () => { | ||
| try { | ||
| setError(null); | ||
| const data = await batchesApi.get(workspacePath, batchId); | ||
| setBatch(data); | ||
|
|
||
| // Fetch task details for any new task IDs (check ref, not state) | ||
| for (const taskId of data.task_ids) { | ||
| if (!fetchedTaskIdsRef.current.has(taskId)) { | ||
| fetchedTaskIdsRef.current.add(taskId); | ||
| tasksApi.getOne(workspacePath, taskId).then((task) => { | ||
| setTasks((prev) => ({ ...prev, [taskId]: task })); | ||
| }).catch((err) => { | ||
| console.error(`Failed to fetch task ${taskId}:`, err); | ||
| }); | ||
| } | ||
| } | ||
| } catch { | ||
| setError('Failed to load batch details'); | ||
| } |
There was a problem hiding this comment.
Transient poll failure replaces the entire batch view with an error message.
setError(null) on line 65 clears any previous error at the start of every poll, and a subsequent failure on line 81 sets a new error. Because the render on line 131 short-circuits to show only the error box when error is set, a single transient network hiccup during polling will hide all batch progress from the user.
Consider either:
- Only showing the error as a non-blocking toast/banner alongside the batch UI, or
- Not clearing/setting
errorduring background polls (only on the initial fetch).
Option 2: skip error-state mutation during polling
One approach — track whether this is the initial fetch vs. a background poll, and only surface errors prominently on the initial load:
- const fetchBatch = useCallback(async () => {
+ const fetchBatch = useCallback(async (isInitial = false) => {
try {
- setError(null);
+ if (isInitial) setError(null);
const data = await batchesApi.get(workspacePath, batchId);
setBatch(data);
// ... task fetching ...
} catch {
- setError('Failed to load batch details');
+ if (isInitial) setError('Failed to load batch details');
+ else console.error('Background poll failed for batch', batchId);
}
}, [workspacePath, batchId]);Then in the initial-fetch effect:
useEffect(() => {
- fetchBatch();
+ fetchBatch(true);
}, [batchId, workspacePath]);Also applies to: 131-137
🤖 Prompt for AI Agents
In `@web-ui/src/components/execution/BatchExecutionMonitor.tsx` around lines 63 -
82, The polling logic currently clears and sets the global error state on every
background poll, letting a transient failure replace the whole batch UI; change
fetchBatch to accept an isInitial boolean (or use an isInitialFetchRef) and only
mutate error state for the initial load: remove the unconditional setError(null)
at the top of fetchBatch, call fetchBatch(true) on mount and fetchBatch(false)
for periodic polls, and inside fetchBatch only call setError('Failed to load
batch details') when isInitial is true (and clear error only on a successful
initial fetch). Keep the task-fetching logic (fetchedTaskIdsRef,
tasksApi.getOne, setTasks) unchanged.
| useEffect(() => { | ||
| if (!batch || expandedTaskId) return; | ||
| const inProgress = batch.task_ids.find( | ||
| (id) => batch.results[id] === 'IN_PROGRESS' | ||
| ); | ||
| if (inProgress) setExpandedTaskId(inProgress); | ||
| }, [batch, expandedTaskId]); |
There was a problem hiding this comment.
Auto-expand overrides user's manual collapse of an IN_PROGRESS task.
When the user collapses an expanded task (setting expandedTaskId to null), this effect immediately re-runs because expandedTaskId is in the dependency array. If there's still an IN_PROGRESS task, it re-expands, making it impossible for the user to collapse it.
A simple fix is to track whether the user has manually toggled, and skip auto-expand after that.
Proposed fix using a "user has interacted" ref
+ const userToggledRef = useRef(false);
+
// Auto-expand the first IN_PROGRESS task
useEffect(() => {
- if (!batch || expandedTaskId) return;
+ if (!batch || expandedTaskId || userToggledRef.current) return;
const inProgress = batch.task_ids.find(
(id) => batch.results[id] === 'IN_PROGRESS'
);
if (inProgress) setExpandedTaskId(inProgress);
}, [batch, expandedTaskId]);Then in the toggle handler (line 237–238):
onToggle={() => {
+ userToggledRef.current = true;
setExpandedTaskId(expandedTaskId === taskId ? null : taskId);
}}🤖 Prompt for AI Agents
In `@web-ui/src/components/execution/BatchExecutionMonitor.tsx` around lines 102 -
108, The effect auto-expanding IN_PROGRESS tasks overrides a user's manual
collapse because expandedTaskId is in the dependency array; add a ref (e.g.,
userInteractedRef) to track manual toggles, set userInteractedRef.current = true
inside the task toggle handler (the function that calls setExpandedTaskId), then
update the useEffect that auto-expands (the useEffect referencing batch,
expandedTaskId and calling setExpandedTaskId) to skip auto-expand when
userInteractedRef.current is true; also reset userInteractedRef.current = false
when a new batch arrives (e.g., when batch.id changes) so auto-expand can run
for new batches.
Code Review: Execution Monitor View (#332)OverviewThis is an excellent implementation of real-time execution monitoring with SSE streaming. The PR successfully delivers both single-task and batch execution monitoring with comprehensive test coverage (53 new tests, 253 total passing). The architecture demonstrates strong engineering practices, and the iterative improvements from previous review rounds show excellent responsiveness to feedback. ✅ StrengthsArchitecture & Design Patterns
State Management
Accessibility
Error Handling
Test Coverage
Backend Cleanup
🔍 Areas for Consideration1. Reconnection Logic (Medium Priority) The disconnection banner uses window.location.reload() which loses accumulated events and scroll position. SSE connections naturally reconnect on transient failures. Consider manual close() → re-mount pattern or document why reload is needed. 2. Silent Catch Handlers (Low Priority) Added console.error per previous reviews, but some catch blocks don't update UI error state. Users won't know why a task row is empty. Consider inline error indicators. 3. Type Safety - Event Discrimination Event type discrimination via string checks is safe but fragile if backend message formats change. Consider structured event subtypes from backend as a future enhancement. 🎯 CodeFRAME Alignment ✅
🛡️ Security ✅
📊 PerformanceOptimizations Present ✅
Potential Concerns
🎉 SummaryVerdict: Approve with minor suggestions This PR demonstrates excellent engineering:
Minor improvements to consider:
Overall: Ready to merge. The suggestions above are non-blocking polish items that can be addressed in follow-up PRs if needed. Reviewed by: Claude Sonnet 4.5 (code-reviewer) |
| {isExpanded && ( | ||
| <div className="border-t px-4 pb-4 pt-2"> | ||
| {shouldStream ? ( | ||
| <div className="h-64"> |
There was a problem hiding this comment.
🟢 Low
execution/BatchExecutionMonitor.tsx:294 EventStream uses flex-1 and h-full internally, but the parent div is a block with fixed height. Consider adding flex flex-col overflow-hidden to the wrapper so the internal layout resolves correctly and scrolling works.
| <div className="h-64"> | |
| <div className="flex h-64 flex-col overflow-hidden"> |
🚀 Want me to fix this? Reply ex: "fix it for me".
| if (batchId || taskIdParam || !workspacePath) { | ||
| setResolving(false); | ||
| return; | ||
| } |
There was a problem hiding this comment.
🟢 Low
execution/page.tsx:65 When workspacePath becomes available after hydration, resolving is already false (set during the initial mount when !workspacePath was true). Consider resetting resolving to true before starting the API call, or restructuring the condition so the fetch effect only sets resolving(false) after attempting the API call.
- if (batchId || taskIdParam || !workspacePath) {
- setResolving(false);
+ if (batchId || taskIdParam) {
+ setResolving(false);
+ return;
+ }
+
+ if (!workspacePath) {
return;
}🚀 Want me to fix this? Reply ex: "fix it for me".
| // Initial fetch | ||
| useEffect(() => { | ||
| fetchBatch(); | ||
| }, [batchId, workspacePath]); // eslint-disable-line react-hooks/exhaustive-deps |
There was a problem hiding this comment.
🟢 Low
execution/BatchExecutionMonitor.tsx:85 Consider resetting fetchedTaskIdsRef and tasks state when batchId changes, otherwise stale task data persists across batch switches during client-side navigation.
- // Initial fetch
- useEffect(() => {
- fetchBatch();
- }, [batchId, workspacePath]); // eslint-disable-line react-hooks/exhaustive-deps
+ // Reset cached data and fetch when batch changes
+ useEffect(() => {
+ fetchedTaskIdsRef.current.clear();
+ setTasks({});
+ fetchBatch();
+ }, [batchId, workspacePath]); // eslint-disable-line react-hooks/exhaustive-deps🚀 Want me to fix this? Reply ex: "fix it for me".
Three issues prevented SSE events from reaching the frontend:
- /stream endpoint sent raw text lines, not structured JSON events
- start_single_task didn't pass event_publisher to execute_agent
- Synchronous execution blocked before SSE could connect
Changes:
- Rename raw output endpoint to GET /{task_id}/output
- Add structured SSE endpoint at GET /{task_id}/stream using EventPublisher
- Run agent in background thread with event_publisher wired through
- Add 15s heartbeat comments to event_stream_generator
- Update streaming tests for renamed paths
Combine error + detail fields in structured API errors so the UI displays "Cannot execute: No tasks ready" instead of just the summary.
Code Review: Execution Monitor View with Real-Time SSE StreamingThis is a comprehensive and well-structured implementation that delivers a complete real-time execution monitoring experience. The code quality is high, with strong attention to accessibility, performance optimization, and test coverage. ✅ Strengths1. Excellent Architecture & Separation of Concerns
2. Performance Optimizations
3. Accessibility
4. User Experience
5. Error Handling
6. Test Coverage
🔍 Areas for Consideration1. Potential Memory Leak in Event AccumulationLocation: eventsRef.current = [...eventsRef.current, event];Issue: Events accumulate unbounded in memory. For long-running tasks with verbose output, this could grow to thousands of events. Recommendation: Consider adding a sliding window (e.g., keep last 500 events) or implement virtual scrolling for very long event streams. // Example fix
eventsRef.current = [...eventsRef.current, event].slice(-500);2. XSS Risk in Event DisplayLocation: Event components display user/server-provided content Issue: While React escapes strings by default, the Current State: Likely safe due to React's automatic escaping, but worth verifying the backend sanitizes these fields. Recommendation: Add explicit sanitization or use a library like DOMPurify if rich text is ever needed. 3. Race Condition in Batch PollingLocation: Issue: The Current Impact: Minimal since workspace/batch rarely change, but could cause duplicate requests during transitions. Recommendation: Wrap 4. Missing Error BoundaryLocation: Execution pages and BatchExecutionMonitor Issue: If any component throws during render (e.g., malformed event data), the entire page crashes with no recovery. Recommendation: Wrap the main execution monitor components in an Error Boundary that shows a "Something went wrong" message and allows retry. 5. SSE Reconnection StrategyLocation: Issue: When SSE connection is lost ( Current State: Works for manual refresh, but users might not understand they need to refresh. Recommendation: Consider adding automatic exponential backoff reconnection, or a "Reconnect" button in the DISCONNECTED state. 6. Type Safety on Event TypesLocation: Issue: Type assertions without guards: const { phase } = event as ProgressEvent;Recommendation: Add runtime type guards or use discriminated unions more safely: if (event.event_type === 'progress' && 'phase' in event) {
const { phase } = event;
// ...
}🔒 Security Review✅ Good Practices
|
Code Review - PR #339: Execution Monitor ViewOverviewThis PR implements a comprehensive real-time execution monitoring UI for the CodeFRAME web interface, adding SSE-based streaming, live event rendering, and batch execution monitoring. The implementation is well-architected with 19 new files, 53 new tests (253 total passing), and follows React/Next.js best practices. ✅ Strengths1. Excellent Architecture & Separation of Concerns
2. Performance Optimizations
3. Robust Error Handling
4. Accessibility
5. Test Coverage
🔍 Issues & RecommendationsHigh Priority1. Potential XSS Vulnerability in Event Rendering 🔴Location: Multiple event components (FileChangeEvent, ShellCommandEvent, VerificationEvent) Issue: If backend event data contains user-controlled content (file paths, shell commands, error messages), rendering them directly could expose XSS risks. Recommendation:
2. Missing Error Boundaries 🟡Location: EventStream.tsx - event rendering Issue: If any event fails to render, the entire stream crashes. Recommendation: Add error boundary around EventItem to prevent cascading failures 3. Race Condition in Batch Polling 🟡Location: Issue: Recommendation: Remove Medium Priority4. Memory Leak: Unbounded Event Accumulation 🟡Location: Issue: Events accumulate indefinitely. For long-running tasks, this could consume significant memory. Recommendation: Implement a sliding window (MAX_EVENTS = 1000) to limit memory usage 5. Type Safety: Status String Literals 🟡Location: Issue: Using string literals for status checks instead of type unions. Recommendation: Use TypeScript const assertions for type-safe status checks 6. Missing Loading States 🟡Location: Issue: No loading indicator during stop operation. User gets no feedback. Recommendation: Add loading state during async operations Low Priority7. Console.error Instead of Proper Error Reporting 🔵Multiple instances of
Recommendation: Use toast notifications or error states to inform users. 8. Hardcoded Magic Numbers 🔵
Recommendation: Extract to named constants 9. Accessibility: Focus Management 🔵Location: When a blocker appears, focus should automatically move to the textarea for keyboard users. 🔒 Security Considerations
🧪 Testing RecommendationsAdd Integration Tests For:
📊 Performance ConsiderationsGood:
Monitor:
✨ Final VerdictOverall Assessment: ⭐⭐⭐⭐ (4/5) This is a well-crafted implementation with strong architecture, good test coverage, and thoughtful UX. The primary concerns are around potential XSS vulnerabilities, missing error boundaries, and the batch polling race condition. Recommendation: Approve with ChangesMust Fix Before Merge:
Can Address in Follow-up:
Great work on this feature! The execution monitor significantly improves the user experience for real-time task monitoring. 🚀 |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
codeframe/ui/routers/streaming_v2.py (1)
22-25:⚠️ Potential issue | 🟡 MinorRemove the unused
routerobject or move the stream endpoint to streaming_v2.py.The
APIRouteris mounted inserver.py:477but has no routes registered on it—all routes are defined ontasks_v2.routerinstead. This adds unnecessary overhead and contradicts the guideline that "streaming router must provide /api/v2/tasks/{id}/stream endpoint." Either remove this router entirely (since the module now serves only as SSE utilities) or relocate thestream_task_eventsendpoint fromtasks_v2.py:855here.
🤖 Fix all issues with AI agents
In `@codeframe/ui/routers/streaming_v2.py`:
- Around line 143-147: Add a public unsubscribe method on EventPublisher (e.g.,
EventPublisher.unsubscribe(subscription)) and use it from the finally block
instead of touching publisher._lock and publisher._subscribers directly;
implement unsubscribe to take care of acquiring the internal lock, safely
locating the subscriber list for subscription.task_id (use get to avoid
KeyError), remove the subscription if present, and delete the task_id key only
if the list is empty (avoiding recreating behavior for defaultdicts), so both
the setup (where subscriptions are added) and teardown use the same safe, public
API.
- Around line 107-114: The code is directly importing and using the private
_Subscription class and touching publisher._lock and publisher._subscribers;
replace that with the public EventPublisher.subscribe(task_id) call and let the
publisher manage queue/lock/subscriber lifecycle. Call
publisher.subscribe(task_id) to obtain the subscription object (or subscription
handle) and then implement HTTP-specific behavior (heartbeat using
heartbeat_interval, request disconnection monitoring and cleanup) on top of that
returned handle rather than mutating publisher._subscribers or publisher._lock
or importing _Subscription. Ensure you remove direct references to
_Subscription, publisher._lock, and publisher._subscribers and use the public
subscribe/unsubscribe APIs for cleanup.
In `@codeframe/ui/routers/tasks_v2.py`:
- Around line 855-896: Add the standard rate limit decorator to the SSE
endpoint: apply `@rate_limit_standard`() directly above the async def
stream_task_events(...) function (same style as used on
stream_task_output_lines) so the long-lived Server-Sent Events connection is
subject to the same request limits; ensure the decorator is properly imported or
already available in the module and keep the decorator placement immediately
above the function definition.
- Around line 604-626: The current _run_agent fire-and-forget uses a daemon
thread and doesn't manage lifecycle or protect the error-publishing calls; fix
by 1) replacing the daemon behavior with tracked threads: before starting, check
a shared registry (e.g., active_agent_threads keyed by task_id) to avoid
duplicate runs and register the Thread instance
(threading.Thread(target=_run_agent)) so it can be joined on shutdown; 2) make
the thread non-daemon and ensure you remove the entry from active_agent_threads
when finished (in a finally block) so state stays consistent; and 3) harden the
exception handler inside _run_agent: wrap publisher.publish_sync and
publisher.complete_task_sync in their own try/except blocks (log any failures
with logger.error including exception info and error_type), and ensure a final
attempt to mark the task complete or failed (using task_id and ErrorEvent) in a
finally section so runtime.execute_agent failures cannot leave tasks IN_PROGRESS
and publishing errors are surfaced.
🧹 Nitpick comments (3)
web-ui/__tests__/lib/api.test.ts (1)
1-68: Consider adding unit tests forblockersApi,batchesApi, andtasksApi.stopExecution/resumeExecution.The test file currently only covers
normalizeErrorDetail. The new API methods lack dedicated unit tests that verify correct URL construction, parameter passing, and error handling — especially theencodeURIComponentbehavior on user-provided IDs (blockerId, batchId, taskId) in endpoint paths.codeframe/ui/routers/tasks_v2.py (1)
598-602: Inline imports fromstreaming_v2andcore.modelsinside the endpoint handler.These imports are deferred to avoid circular dependencies, but it makes the dependency graph opaque. Consider whether the circular dependency can be resolved at the module level (e.g., by restructuring imports or using a shared interface). If the inline import is necessary, a brief comment explaining why would help future maintainers.
tests/ui/test_v2_routers_integration.py (1)
748-779: Good 404 coverage, but no integration tests for happy-path streaming or background execution.The renamed/new tests correctly validate 404 responses for both
/outputand/streamendpoints. However, the following scenarios introduced in this PR lack integration test coverage:
start_single_taskwithexecute=True— the background thread execution path (lines 598-629 intasks_v2.py) is untested.stream_task_eventsfor an existing task — no test verifies that a 200 withtext/event-streamcontent type is returned when the task exists.- Behavioral difference:
/outputreturns 404 when no run exists, but/streamreturns 200 (streaming). This asymmetry isn't tested or documented.These are understandable to defer (PR summary mentions "manual interactive checks remain"), but worth tracking.
Do you want me to generate integration test stubs for the background execution and structured event streaming happy paths, or open an issue to track this?
| queue: asyncio.Queue = asyncio.Queue(maxsize=1000) | ||
| loop = asyncio.get_running_loop() | ||
|
|
||
| from codeframe.core.streaming import _Subscription | ||
| subscription = _Subscription(task_id, queue, loop) | ||
|
|
||
| async with publisher._lock: | ||
| publisher._subscribers[task_id].append(subscription) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the full streaming_v2.py file to see the context around lines 107-114 and 143-147
fd streaming_v2.py --type fRepository: frankbria/codeframe
Length of output: 99
🏁 Script executed:
# Find the core streaming module
fd streaming.py --type f | grep coreRepository: frankbria/codeframe
Length of output: 154
🏁 Script executed:
# Read streaming_v2.py to see context around lines 107-114 and 143-147
wc -l codeframe/ui/routers/streaming_v2.pyRepository: frankbria/codeframe
Length of output: 103
🏁 Script executed:
# Read core/streaming.py to check EventPublisher implementation
wc -l codeframe/core/streaming.pyRepository: frankbria/codeframe
Length of output: 94
🏁 Script executed:
# Read the full streaming_v2.py file
cat -n codeframe/ui/routers/streaming_v2.pyRepository: frankbria/codeframe
Length of output: 5950
🏁 Script executed:
# Read core/streaming.py to find EventPublisher class definition
cat -n codeframe/core/streaming.py | head -200Repository: frankbria/codeframe
Length of output: 7011
🏁 Script executed:
# Find the EventPublisher class definition in core/streaming.py
ast-grep --pattern $'class EventPublisher {
$$$
}'Repository: frankbria/codeframe
Length of output: 45
🏁 Script executed:
# Also search for public subscribe methods
rg -n 'def subscribe|def unsubscribe|async def subscribe|async def unsubscribe' codeframe/core/streaming.py -C3Repository: frankbria/codeframe
Length of output: 1059
🏁 Script executed:
# Read the full EventPublisher class definition and methods
sed -n '290,350p' codeframe/core/streaming.pyRepository: frankbria/codeframe
Length of output: 2551
🏁 Script executed:
# Read the unsubscribe method
sed -n '472,490p' codeframe/core/streaming.pyRepository: frankbria/codeframe
Length of output: 814
🏁 Script executed:
# Also read the _Subscription class to understand its structure
sed -n '200,290p' codeframe/core/streaming.pyRepository: frankbria/codeframe
Length of output: 2928
🏁 Script executed:
# Check how subscribe() is being used elsewhere in the codebase
rg -n 'publisher\.subscribe\|\.subscribe\(' --type=py -C2 | head -100Repository: frankbria/codeframe
Length of output: 45
🏁 Script executed:
# Check if there are any other places accessing EventPublisher internals
rg -n 'publisher\._lock|publisher\._subscribers|_Subscription' --type=py -C2Repository: frankbria/codeframe
Length of output: 3523
🏁 Script executed:
# Let's see the full subscribe() method implementation to understand what it returns
sed -n '319,365p' codeframe/core/streaming.pyRepository: frankbria/codeframe
Length of output: 2019
🏁 Script executed:
# Check if there are other SSE/streaming handlers that might use subscribe() properly
rg -n 'async def.*stream|AsyncGenerator' codeframe/ui/routers/ --type=pyRepository: frankbria/codeframe
Length of output: 432
🏁 Script executed:
# Check how stream_task_events in tasks_v2.py implements streaming
sed -n '856,920p' codeframe/ui/routers/tasks_v2.pyRepository: frankbria/codeframe
Length of output: 2063
Use the public EventPublisher.subscribe() method instead of directly accessing private internals.
The router directly imports _Subscription and accesses publisher._lock and publisher._subscribers (lines 110-114, 143-147), breaking encapsulation and violating the thin adapter pattern. EventPublisher already exposes a public subscribe(task_id) method that handles subscription management, queue creation, and cleanup. The HTTP-specific concerns (heartbeats via heartbeat_interval, request disconnection detection) should be layered on top of the public API rather than reimplementing subscription logic by reaching into private state.
🤖 Prompt for AI Agents
In `@codeframe/ui/routers/streaming_v2.py` around lines 107 - 114, The code is
directly importing and using the private _Subscription class and touching
publisher._lock and publisher._subscribers; replace that with the public
EventPublisher.subscribe(task_id) call and let the publisher manage
queue/lock/subscriber lifecycle. Call publisher.subscribe(task_id) to obtain the
subscription object (or subscription handle) and then implement HTTP-specific
behavior (heartbeat using heartbeat_interval, request disconnection monitoring
and cleanup) on top of that returned handle rather than mutating
publisher._subscribers or publisher._lock or importing _Subscription. Ensure you
remove direct references to _Subscription, publisher._lock, and
publisher._subscribers and use the public subscribe/unsubscribe APIs for
cleanup.
| async with publisher._lock: | ||
| if subscription in publisher._subscribers[task_id]: | ||
| publisher._subscribers[task_id].remove(subscription) | ||
| if not publisher._subscribers[task_id]: | ||
| del publisher._subscribers[task_id] |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Cleanup duplicates the private-API coupling from subscription setup.
The finally block again accesses publisher._lock and publisher._subscribers directly. If a public unsubscribe(subscription) method existed on EventPublisher, both the setup (lines 113-114) and teardown would be cleaner and less fragile.
Also note: if publisher._subscribers is a defaultdict, the del on line 147 removes the key, but a subsequent append (from another connection) would silently recreate it. If it's a plain dict, accessing a non-existent task_id in line 144 could KeyError if another coroutine already deleted the key between the _lock acquisition and the in check — though the async lock should prevent this within a single event loop.
🤖 Prompt for AI Agents
In `@codeframe/ui/routers/streaming_v2.py` around lines 143 - 147, Add a public
unsubscribe method on EventPublisher (e.g.,
EventPublisher.unsubscribe(subscription)) and use it from the finally block
instead of touching publisher._lock and publisher._subscribers directly;
implement unsubscribe to take care of acquiring the internal lock, safely
locating the subscriber list for subscription.task_id (use get to avoid
KeyError), remove the subscription if present, and delete the task_id key only
if the list is empty (avoiding recreating behavior for defaultdicts), so both
the setup (where subscriptions are added) and teardown use the same safe, public
API.
| def _run_agent(): | ||
| try: | ||
| runtime.execute_agent( | ||
| workspace, | ||
| run, | ||
| dry_run=dry_run, | ||
| verbose=verbose, | ||
| event_publisher=publisher, | ||
| ) | ||
| except Exception as exc: | ||
| logger.error(f"Background agent failed for task {task_id}: {exc}", exc_info=True) | ||
| publisher.publish_sync( | ||
| task_id, | ||
| ErrorEvent( | ||
| task_id=task_id, | ||
| error=str(exc), | ||
| error_type=type(exc).__name__, | ||
| ), | ||
| ) | ||
| publisher.complete_task_sync(task_id) | ||
|
|
||
| thread = threading.Thread(target=_run_agent, daemon=True) | ||
| thread.start() |
There was a problem hiding this comment.
Daemon thread for agent execution is fire-and-forget with no lifecycle management.
Several concerns with this approach:
- Daemon thread: If the server process shuts down, the agent is killed mid-execution with no cleanup, potentially leaving tasks in an inconsistent state (e.g.,
IN_PROGRESSforever). - No thread tracking: The spawned thread isn't stored anywhere — there's no way to check if execution is already running, join it on shutdown, or enforce one-agent-per-task.
- Nested exception risk: If
publisher.publish_syncorpublisher.complete_task_syncon lines 615-623 themselves raise, the error is silently swallowed (no outer try/catch inside_run_agent's except block).
For (3), consider wrapping the error-publishing calls:
Proposed defensive fix for error publishing
except Exception as exc:
logger.error(f"Background agent failed for task {task_id}: {exc}", exc_info=True)
- publisher.publish_sync(
- task_id,
- ErrorEvent(
- task_id=task_id,
- error=str(exc),
- error_type=type(exc).__name__,
- ),
- )
- publisher.complete_task_sync(task_id)
+ try:
+ publisher.publish_sync(
+ task_id,
+ ErrorEvent(
+ task_id=task_id,
+ error=str(exc),
+ error_type=type(exc).__name__,
+ ),
+ )
+ publisher.complete_task_sync(task_id)
+ except Exception as pub_exc:
+ logger.error(
+ f"Failed to publish error event for task {task_id}: {pub_exc}",
+ exc_info=True,
+ )Items (1) and (2) are broader architectural concerns that may be acceptable for an initial implementation but should be tracked.
🤖 Prompt for AI Agents
In `@codeframe/ui/routers/tasks_v2.py` around lines 604 - 626, The current
_run_agent fire-and-forget uses a daemon thread and doesn't manage lifecycle or
protect the error-publishing calls; fix by 1) replacing the daemon behavior with
tracked threads: before starting, check a shared registry (e.g.,
active_agent_threads keyed by task_id) to avoid duplicate runs and register the
Thread instance (threading.Thread(target=_run_agent)) so it can be joined on
shutdown; 2) make the thread non-daemon and ensure you remove the entry from
active_agent_threads when finished (in a finally block) so state stays
consistent; and 3) harden the exception handler inside _run_agent: wrap
publisher.publish_sync and publisher.complete_task_sync in their own try/except
blocks (log any failures with logger.error including exception info and
error_type), and ensure a final attempt to mark the task complete or failed
(using task_id and ErrorEvent) in a finally section so runtime.execute_agent
failures cannot leave tasks IN_PROGRESS and publishing errors are surfaced.
| @router.get("/{task_id}/stream") | ||
| async def stream_task_events( | ||
| request: Request, | ||
| task_id: str, | ||
| workspace: Workspace = Depends(get_v2_workspace), | ||
| ) -> StreamingResponse: | ||
| """Stream structured execution events for a task via SSE. | ||
|
|
||
| Returns Server-Sent Events with JSON-formatted ExecutionEvent payloads. | ||
| Compatible with browser EventSource (no custom auth headers required). | ||
|
|
||
| Event types (in data.event_type): | ||
| - ``progress``: Phase/step transitions | ||
| - ``output``: stdout/stderr lines | ||
| - ``blocker``: Human input needed | ||
| - ``completion``: Task finished (success/failure/blocked) | ||
| - ``error``: Execution error | ||
| - ``heartbeat``: Keep-alive | ||
|
|
||
| For raw text output lines (cf work follow equivalent), | ||
| use GET /{task_id}/output instead. | ||
| """ | ||
| task = tasks.get(workspace, task_id) | ||
| if not task: | ||
| raise HTTPException( | ||
| status_code=404, | ||
| detail=api_error("Task not found", ErrorCodes.NOT_FOUND, f"No task with id {task_id}"), | ||
| ) | ||
|
|
||
| from codeframe.ui.routers.streaming_v2 import event_stream_generator, get_event_publisher | ||
|
|
||
| publisher = get_event_publisher() | ||
|
|
||
| return StreamingResponse( | ||
| event_stream_generator(task_id, publisher, request), | ||
| media_type="text/event-stream", | ||
| headers={ | ||
| "Cache-Control": "no-cache", | ||
| "Connection": "keep-alive", | ||
| "X-Accel-Buffering": "no", | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Missing @rate_limit_standard() on the structured event stream endpoint.
stream_task_output_lines (line 750) has @rate_limit_standard(), but stream_task_events does not. This is an SSE endpoint that holds a long-lived connection and consumes server resources — without rate limiting, a client can open unbounded connections per task or across tasks.
Proposed fix
`@router.get`("/{task_id}/stream")
+@rate_limit_standard()
async def stream_task_events(🤖 Prompt for AI Agents
In `@codeframe/ui/routers/tasks_v2.py` around lines 855 - 896, Add the standard
rate limit decorator to the SSE endpoint: apply `@rate_limit_standard`() directly
above the async def stream_task_events(...) function (same style as used on
stream_task_output_lines) so the long-lived Server-Sent Events connection is
subject to the same request limits; ensure the decorator is properly imported or
already available in the module and keep the decorator placement immediately
above the function definition.
Next.js rewrite proxy buffers chunked responses, which prevents SSE events from streaming in real-time. EventSource now connects directly to the FastAPI backend via NEXT_PUBLIC_SSE_URL (defaults to http://localhost:8000). Also quiets expected stop-button errors when no active run exists.
Code Review - Execution Monitor PR #339SummaryThis PR delivers a comprehensive real-time execution monitoring system with SSE streaming, live event display, and inline blocker handling. The implementation is well-architected with proper separation of concerns between hooks, components, and API layers. ✅ Strengths1. Excellent Architecture & Separation of Concerns
2. Performance Optimizations
3. Robust State Management
4. Excellent UX Patterns
5. Comprehensive Test Coverage
🔍 Issues & Recommendations1. Security & Input Validation Medium Priority: The blocker answer submission lacks input validation: // web-ui/src/components/execution/BlockerEvent.tsx:28
const handleSubmit = async () => {
if (!answer.trim() || isSubmitting) return;
// No max length check or sanitization
await blockersApi.answer(workspacePath, String(event.blocker_id), answer.trim());Recommendation: Add max length validation and sanitize input: const MAX_ANSWER_LENGTH = 10000;
const handleSubmit = async () => {
const trimmed = answer.trim();
if (!trimmed || isSubmitting || trimmed.length > MAX_ANSWER_LENGTH) return;
await blockersApi.answer(workspacePath, String(event.blocker_id), trimmed);
};2. Error Handling Low Priority: SSE connection errors could be more user-friendly: // web-ui/src/hooks/useTaskStream.ts:144-146
} catch {
// Ignore malformed messages (e.g. SSE comments)
}Recommendation: Log to console in development for debugging: } catch (err) {
if (process.env.NODE_ENV === 'development') {
console.warn('Failed to parse SSE event:', data, err);
}
}3. Memory Leak Potential Low Priority: EventPublisher subscription cleanup in streaming_v2.py:142-148 has a potential race condition if multiple subscriptions are removed concurrently. Recommendation: The current implementation with async context manager is good, but consider adding a try-finally around the subscription list manipulation for extra safety. 4. Accessibility Low Priority: Some dynamic content lacks ARIA live regions:
Recommendation: Add to EventStream component: <div role="log" aria-live="polite" aria-label="Execution events">5. Type Safety Low Priority: In useTaskStream.ts:113, the SSE URL construction could be typed more strictly: const sseBase = process.env.NEXT_PUBLIC_SSE_URL || 'http://localhost:8000';Recommendation: Validate URL format and provide type guard. 📋 Alignment with CLAUDE.md✅ Follows v2 Architecture:
✅ Phase 2 Deliverables:
🎯 Code Quality Assessment
✅ Final VerdictAPPROVE with minor suggestions This is high-quality work that delivers the core execution monitoring functionality with proper architecture and performance characteristics. The identified issues are minor and can be addressed in follow-up PRs or as inline improvements. Key Achievements:
Suggested Follow-ups (not blockers):
🧪 Testing RecommendationsBefore merging, verify:
Great work on this feature! The execution monitor will significantly improve the developer experience. 🚀 |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@web-ui/src/app/execution/`[taskId]/page.tsx:
- Around line 26-29: getSelectedWorkspacePath() currently guards SSR but calls
localStorage.getItem() without protection; modify getSelectedWorkspacePath (the
function referenced by useEffect in page.tsx) to wrap any localStorage access in
a try/catch and return null on error so workspace hydration via
setWorkspacePath(getSelectedWorkspacePath()) won't throw in incognito/restricted
contexts; ensure the catch swallows or logs the error and returns null (do not
change the useEffect), preserving existing behavior when window is undefined or
item is missing.
🧹 Nitpick comments (3)
web-ui/src/hooks/useTaskStream.ts (1)
119-149: UnstablehandleMessagereference may trigger SSE reconnections.
handleMessagedepends on all six callback props (onEvent,onProgress,onOutput,onBlocker,onComplete,onError). If any caller passes inline arrow functions (not wrapped inuseCallback),handleMessagegets a new identity every render, which likely causesuseEventSourceto tear down and re-establish the SSE connection.Consider using a
useRefto hold the latest callbacks, sohandleMessagehas a stable identity:♻️ Sketch using refs for stable identity
+ const callbacksRef = useRef({ onEvent, onProgress, onOutput, onBlocker, onComplete, onError }); + useEffect(() => { + callbacksRef.current = { onEvent, onProgress, onOutput, onBlocker, onComplete, onError }; + }); + const handleMessage = useCallback( (data: string) => { try { const event = JSON.parse(data) as ExecutionEvent; setLastEvent(event); - onEvent?.(event); + callbacksRef.current.onEvent?.(event); switch (event.event_type) { case 'progress': - onProgress?.(event); + callbacksRef.current.onProgress?.(event); break; // … same for other cases } } catch { } }, - [onEvent, onProgress, onOutput, onBlocker, onComplete, onError] + [] // stable — reads latest callbacks from ref );web-ui/src/app/execution/[taskId]/page.tsx (2)
70-77: Use<Link>instead of<a>for internal navigation consistency.Line 74 uses a raw
<a href="/tasks">for internal navigation, while line 86 uses Next.js<Link>. Using<Link>enables client-side navigation without a full page reload.♻️ Proposed fix
- <a href="/tasks" className="text-blue-600 hover:underline">Back to Task Board</a> + <Link href="/tasks" className="text-blue-600 hover:underline">Back to Task Board</Link>
61-64: No-op callback could be replaced with a comment or removed.
handleBlockerAnsweredis an emptyuseCallbackthat does nothing. The inline comment explains why, which is good. However, ifEventStream'sonBlockerAnsweredprop is optional, you could skip passing it entirely and let the comment live near line 147 instead.
| useEffect(() => { | ||
| setWorkspacePath(getSelectedWorkspacePath()); | ||
| setWorkspaceReady(true); | ||
| }, []); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg "getSelectedWorkspacePath" --type ts --type tsx -B 2 -A 10Repository: frankbria/codeframe
Length of output: 90
🏁 Script executed:
rg "getSelectedWorkspacePath" -A 10 -B 2Repository: frankbria/codeframe
Length of output: 18007
🏁 Script executed:
cat web-ui/src/lib/workspace-storage.tsRepository: frankbria/codeframe
Length of output: 2638
Workspace hydration should handle localStorage access errors.
getSelectedWorkspacePath() at line 26 checks for SSR context (typeof window === 'undefined') but does not wrap localStorage.getItem() in a try/catch. In incognito/private mode or restricted iframe contexts, localStorage can throw an error, which would propagate as an unhandled exception. Wrap the call in try/catch to gracefully return null on access errors.
🤖 Prompt for AI Agents
In `@web-ui/src/app/execution/`[taskId]/page.tsx around lines 26 - 29,
getSelectedWorkspacePath() currently guards SSR but calls localStorage.getItem()
without protection; modify getSelectedWorkspacePath (the function referenced by
useEffect in page.tsx) to wrap any localStorage access in a try/catch and return
null on error so workspace hydration via
setWorkspacePath(getSelectedWorkspacePath()) won't throw in incognito/restricted
contexts; ensure the catch swallows or logs the error and returns null (do not
change the useEffect), preserving existing behavior when window is undefined or
item is missing.
| const agentState = | ||
| !prev.isCompleted && (sseStatus === 'error' || sseStatus === 'closed') | ||
| ? 'DISCONNECTED' | ||
| : prev.agentState; |
There was a problem hiding this comment.
🟢 Low
hooks/useExecutionMonitor.ts:164 When SSE reconnects (sseStatus becomes 'open'), agentState remains 'DISCONNECTED' because the code just preserves prev.agentState. Consider re-deriving agentState from the last event when the connection recovers.
| const agentState = | |
| !prev.isCompleted && (sseStatus === 'error' || sseStatus === 'closed') | |
| ? 'DISCONNECTED' | |
| : prev.agentState; | |
| const agentState = | |
| !prev.isCompleted && (sseStatus === 'error' || sseStatus === 'closed') | |
| ? 'DISCONNECTED' | |
| : prev.agentState === 'DISCONNECTED' && sseStatus === 'open' | |
| ? (findLastNonHeartbeat(eventsRef.current) ? deriveAgentState(findLastNonHeartbeat(eventsRef.current)!) : 'CONNECTING') | |
| : prev.agentState; |
🚀 Want me to fix this? Reply ex: "fix it for me".
The execution monitor was stuck on "Connecting" because: 1. Heartbeats were SSE comments (ignored by EventSource onmessage) 2. Already-completed tasks had no events to deliver (EventPublisher drops events when no subscribers exist) Now the /stream endpoint: - Sends an initial ProgressEvent immediately (transitions UI from CONNECTING to EXECUTING) - For tasks with a terminal run (COMPLETED/FAILED/BLOCKED), emits a synthetic CompletionEvent so the UI shows the final state - For active tasks, delegates to the live event_stream_generator
Code Review - Recent Updates to PR #339I've reviewed the latest changes (3 commits since the last review) and the overall PR implementation. The recent fixes have addressed critical SSE streaming issues and improved the user experience significantly. ✅ Recent Fixes Reviewed1. Error Context Enhancement (commit 762d111)Shows full error context in 2. SSE Direct Backend Connection (commit 97147c9)CRITICAL FIX: Bypassing Next.js proxy for SSE is the correct approach. Next.js rewrites buffer chunked responses, breaking SSE streaming. The implementation at ✅ Excellent architectural decision - this is exactly how SSE should be handled with Next.js. 3. Initial SSE Event & Terminal State Handling (commit d61f7d9)MAJOR UX IMPROVEMENT: The backend now sends an initial The terminal state detection ( 🔍 Detailed Code Quality AssessmentStrengths ⭐⭐⭐⭐⭐
🐛 Issues Found (Minor - Non-Blocking)1. Type Safety - Medium PriorityLocation: Issue: No validation that Recommendation: Add URL validation helper function to validate format and provide better error messages in development. 2. Error Logging - Low PriorityLocation: Issue: Silent error suppression makes debugging difficult. Previous review suggested logging in development mode. Recommendation: Add 3. Race Condition in Subscription Cleanup - Low PriorityLocation: Issue: The current code is safe but could be more defensive with an additional check before deleting from subscribers dict. 4. Environment Variable Documentation - Low PriorityIssue: 📋 Alignment with CLAUDE.md✅ V2 Architecture Compliance:
✅ Phase 2 Complete, Phase 3 In Progress: 🎯 Final Assessment
✅ Recommendation: APPROVEThe recent fixes have resolved critical issues (SSE buffering, connection state, terminal task handling) and the code quality is excellent. The identified issues are minor and can be addressed in follow-up PRs. Key Achievements Since Last Review:
Suggested Follow-ups (Not Blockers):
Great work on this comprehensive feature! The execution monitor significantly improves the developer experience and the implementation quality is production-ready. 🚀 |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@codeframe/ui/routers/tasks_v2.py`:
- Around line 897-938: Race condition: subscribe to the publisher before or
re-check run state after subscribing so CompletionEvent isn't missed; modify
_generate in tasks_v2.py to (1) call event_stream_generator (or otherwise
subscribe to EventPublisher) before relying on get_active_run/get_latest_run, or
(preferred) after you obtain the subscription, re-query
runtime.get_latest_run(task_id) (and/or runtime.get_active_run) inside _generate
and if that newly fetched latest_run is terminal emit a synthetic
CompletionEvent via format_sse_event(CompletionEvent(...)) before entering the
async for loop over event_stream_generator so any completion that happened
during subscription is delivered to the client; reference functions/vars:
get_active_run, get_latest_run, already_terminal, _generate,
event_stream_generator, EventPublisher, CompletionEvent, format_sse_event.
| run = runtime.get_active_run(workspace, task_id) | ||
| latest_run = runtime.get_latest_run(workspace, task_id) if not run else None | ||
| already_terminal = ( | ||
| latest_run is not None | ||
| and latest_run.status in (RunStatus.COMPLETED, RunStatus.FAILED, RunStatus.BLOCKED) | ||
| ) if not run else False | ||
|
|
||
| async def _generate(): | ||
| # Always send an initial progress event so the browser's EventSource | ||
| # fires onmessage and the client transitions from CONNECTING state. | ||
| yield format_sse_event( | ||
| ProgressEvent( | ||
| task_id=task_id, | ||
| phase="connected", | ||
| step=0, | ||
| total_steps=0, | ||
| message="Stream connected", | ||
| ) | ||
| ) | ||
|
|
||
| if already_terminal: | ||
| # Task already finished — emit a synthetic completion event. | ||
| status_map = { | ||
| RunStatus.COMPLETED: "completed", | ||
| RunStatus.FAILED: "failed", | ||
| RunStatus.BLOCKED: "blocked", | ||
| } | ||
| duration = 0.0 | ||
| if latest_run.started_at and latest_run.completed_at: | ||
| duration = (latest_run.completed_at - latest_run.started_at).total_seconds() | ||
| yield format_sse_event( | ||
| CompletionEvent( | ||
| task_id=task_id, | ||
| status=status_map[latest_run.status], | ||
| duration_seconds=duration, | ||
| ) | ||
| ) | ||
| return | ||
|
|
||
| # Live stream — subscribe to the EventPublisher for real-time events. | ||
| async for chunk in event_stream_generator(task_id, publisher, request): | ||
| yield chunk |
There was a problem hiding this comment.
Race condition: task can complete between the active-run check and the publisher subscription.
If the agent finishes (and publishes its CompletionEvent + END_OF_STREAM) after get_active_run returns a non-None run (line 897) but before event_stream_generator subscribes to the publisher (line 937), the client will never receive the completion event. The EventPublisher has no replay buffer, so those events are lost, and the stream will hang indefinitely on heartbeats.
Consider one of:
- Subscribe to the publisher before checking the run status, so no events are missed during the check.
- After subscribing, re-check the run status inside
_generate()and emit a synthetic completion if the run became terminal while connecting. - Add a bounded replay buffer to
EventPublisherso late subscribers see recent events.
Sketch of option 2 (re-check after subscribe)
async def _generate():
yield format_sse_event(
ProgressEvent(
task_id=task_id,
phase="connected",
step=0,
total_steps=0,
message="Stream connected",
)
)
if already_terminal:
# ... existing synthetic completion logic ...
return
- async for chunk in event_stream_generator(task_id, publisher, request):
- yield chunk
+ # Subscribe first, then re-check to close the race window
+ queue = asyncio.Queue(maxsize=1000)
+ loop = asyncio.get_running_loop()
+ from codeframe.core.streaming import _Subscription
+ subscription = _Subscription(task_id, queue, loop)
+ async with publisher._lock:
+ publisher._subscribers[task_id].append(subscription)
+
+ # Re-check: did the run complete while we were setting up?
+ recheck_run = runtime.get_active_run(workspace, task_id)
+ if recheck_run is None:
+ latest = runtime.get_latest_run(workspace, task_id)
+ if latest and latest.status in (RunStatus.COMPLETED, RunStatus.FAILED, RunStatus.BLOCKED):
+ # Emit synthetic completion and bail out
+ yield format_sse_event(
+ CompletionEvent(task_id=task_id, status=status_map[latest.status], duration_seconds=0.0)
+ )
+ # Clean up subscription
+ async with publisher._lock:
+ if subscription in publisher._subscribers[task_id]:
+ publisher._subscribers[task_id].remove(subscription)
+ return
+
+ # Proceed with normal event streaming using the already-established subscription
+ async for chunk in event_stream_generator(task_id, publisher, request, _subscription=subscription):
+ yield chunk🤖 Prompt for AI Agents
In `@codeframe/ui/routers/tasks_v2.py` around lines 897 - 938, Race condition:
subscribe to the publisher before or re-check run state after subscribing so
CompletionEvent isn't missed; modify _generate in tasks_v2.py to (1) call
event_stream_generator (or otherwise subscribe to EventPublisher) before relying
on get_active_run/get_latest_run, or (preferred) after you obtain the
subscription, re-query runtime.get_latest_run(task_id) (and/or
runtime.get_active_run) inside _generate and if that newly fetched latest_run is
terminal emit a synthetic CompletionEvent via
format_sse_event(CompletionEvent(...)) before entering the async for loop over
event_stream_generator so any completion that happened during subscription is
delivered to the client; reference functions/vars: get_active_run,
get_latest_run, already_terminal, _generate, event_stream_generator,
EventPublisher, CompletionEvent, format_sse_event.
| run = runtime.get_active_run(workspace, task_id) | ||
| latest_run = runtime.get_latest_run(workspace, task_id) if not run else None | ||
| already_terminal = ( | ||
| latest_run is not None | ||
| and latest_run.status in (RunStatus.COMPLETED, RunStatus.FAILED, RunStatus.BLOCKED) | ||
| ) if not run else False | ||
|
|
||
| async def _generate(): | ||
| # Always send an initial progress event so the browser's EventSource | ||
| # fires onmessage and the client transitions from CONNECTING state. | ||
| yield format_sse_event( | ||
| ProgressEvent( | ||
| task_id=task_id, | ||
| phase="connected", | ||
| step=0, | ||
| total_steps=0, | ||
| message="Stream connected", | ||
| ) | ||
| ) | ||
|
|
||
| if already_terminal: | ||
| # Task already finished — emit a synthetic completion event. | ||
| status_map = { | ||
| RunStatus.COMPLETED: "completed", | ||
| RunStatus.FAILED: "failed", | ||
| RunStatus.BLOCKED: "blocked", | ||
| } | ||
| duration = 0.0 | ||
| if latest_run.started_at and latest_run.completed_at: | ||
| duration = (latest_run.completed_at - latest_run.started_at).total_seconds() | ||
| yield format_sse_event( | ||
| CompletionEvent( | ||
| task_id=task_id, | ||
| status=status_map[latest_run.status], | ||
| duration_seconds=duration, | ||
| ) | ||
| ) | ||
| return |
There was a problem hiding this comment.
Never-executed task falls through to an indefinite live stream with no events.
When there's no active run and no latest run (task was created but never started), already_terminal is False and the generator subscribes to the publisher. Since no agent will ever publish events for this task, the client receives only heartbeats indefinitely.
Consider returning a synthetic completion/error event (or an HTTP 409/404) for tasks that have never been executed, so the client doesn't hang.
+ never_started = run is None and latest_run is None
+
async def _generate():
yield format_sse_event(
ProgressEvent(
task_id=task_id,
phase="connected",
step=0,
total_steps=0,
message="Stream connected",
)
)
+ if never_started:
+ yield format_sse_event(
+ CompletionEvent(
+ task_id=task_id,
+ status="failed",
+ duration_seconds=0.0,
+ message="No execution run exists for this task",
+ )
+ )
+ return
+
if already_terminal:When the server sends a completion event and closes the stream, the EventSource retry counter was always 0 (reset by onmessage) causing infinite reconnection. Now useTaskStream closes the EventSource immediately upon receiving a completion event.
Code Review: Execution Monitor with Real-time SSE StreamingThis is a well-structured implementation that delivers a comprehensive execution monitoring experience. The code quality is generally high with good test coverage. ✅ Strengths
🔴 Critical Issues1. Memory Leak in Batch Monitor Polling The polling interval cleanup may not fire properly when batch status changes rapidly. If batch.status transitions while an interval is active, a new interval starts without clearing the old one. Fix: Clear the existing interval before starting a new one in the useEffect body. 2. Unhandled Promise Rejections Task fetch failures are silently logged but not shown to the user, potentially leaving missing task names in the UI. Fix: Set an error state or show a placeholder when promises reject. 🟡 Important Issues3. Missing Error Boundary The execution pages lack error boundaries to catch runtime errors in child components. 4. Race Condition in SSE Connection Potential race where sseStatusRef.current is checked before the sseStatus state update completes. Fix: Use sseStatus from state closure inside setState instead of checking the ref. 🔵 Minor Improvements
✅ Security & Best Practices
🧪 Test CoverageExcellent coverage (253 tests passing). Consider adding tests for:
🎯 SummaryOverall: APPROVE with recommendations This successfully bridges the navigation gap from issue 331 and delivers a polished execution monitoring experience. The critical issues are fixable and do not block merging, but should be addressed soon after merge. Priority fixes:
Estimated fix effort: 1-2 hours Great work on the comprehensive test coverage and clean architecture! |
Code Review: Execution Monitor with Real-time SSE StreamingThis is an impressive implementation of a real-time execution monitoring system. The code demonstrates excellent architecture, proper separation of concerns, and comprehensive testing. Here's my detailed review: ✅ Strengths1. Architecture & Design Patterns
2. Backend Implementation# streaming_v2.py - Excellent cleanup
3. Frontend Implementation
4. Test Coverage
🔍 Areas for Consideration1. Backend: Direct _Subscription Access (Minor)# streaming_v2.py:110-114
from codeframe.core.streaming import _Subscription
subscription = _Subscription(task_id, queue, loop)
async with publisher._lock:
publisher._subscribers[task_id].append(subscription)Concern: Directly accessing Suggestion: Consider adding a public method to EventPublisher: # In core/streaming.py
class EventPublisher:
def subscribe_with_queue(self, task_id: str, queue: asyncio.Queue) -> _Subscription:
"""Create and register a subscription with a provided queue."""
subscription = _Subscription(task_id, queue, self.loop)
async with self._lock:
self._subscribers[task_id].append(subscription)
return subscriptionThis would:
2. Frontend: Race Condition in useExecutionMonitor (Minor)// useExecutionMonitor.ts:104-106
if (sseStatusRef.current === 'error' || sseStatusRef.current === 'closed') {
agentState = 'DISCONNECTED';
}Potential issue: The rAF flush callback reads Impact: Low - this would only cause a brief visual inconsistency (wrong badge color for 1 frame). Suggestion: Could be eliminated by: const scheduleFlush = useCallback(() => {
if (rafRef.current !== null) return;
rafRef.current = requestAnimationFrame(() => {
rafRef.current = null;
const events = [...eventsRef.current];
const lastNonHeartbeat = findLastNonHeartbeat(events);
const currentSSEStatus = sseStatusRef.current; // Capture once
let agentState: UIAgentState = lastNonHeartbeat
? deriveAgentState(lastNonHeartbeat)
: 'CONNECTING';
if (currentSSEStatus === 'error' || currentSSEStatus === 'closed') {
agentState = 'DISCONNECTED';
}
// ... rest of logic
});
}, []); // Remove sseStatusRef from deps - it's a ref3. Error Handling: Blocker Answer Submission (Documentation)// [taskId]/page.tsx:47-58
const handleStop = useCallback(async () => {
try {
await tasksApi.stopExecution(workspacePath, taskId);
} catch (err) {
// Silently ignore 400/404
}
}, [workspacePath, taskId]);Observation: This pattern appears in multiple places where API errors are intentionally swallowed. While the comments explain this is expected for already-completed runs, users might not see meaningful error messages if something truly goes wrong. Suggestion: Consider:
4. Performance: Event Filtering in EventStream (Very Minor)// EventStream.tsx:33
const displayEvents = events.filter((e) => e.event_type !== 'heartbeat');Observation: This filter runs on every render. With thousands of events, this could become expensive. Suggestion: Consider memoization: const displayEvents = useMemo(
() => events.filter((e) => e.event_type !== 'heartbeat'),
[events]
);🔒 Security Considerations✅ All good here:
🎯 Alignment with CLAUDE.md✅ Excellent adherence to project guidelines:
📝 Minor Nitpicks
🎉 SummaryThis is excellent work. The implementation is well-architected, thoroughly tested, and properly integrated with the existing codebase. The issues I've identified are all minor and don't block merging:
Recommendation: ✅ Approve and merge The 253 passing tests, proper accessibility attributes, and clean separation of concerns demonstrate this is production-ready code. The execution monitor will significantly improve the developer experience for real-time task monitoring. Great job on this PR! The SSE implementation is particularly well done with the heartbeat mechanism and graceful disconnection handling. |
Summary
/execution/[taskId]with auto-scrolling event stream, progress bar, agent state badges, and file change sidebar/execution?batch=<id>with accordion task rows, per-task live streaming, and batch-level stop/cancel controlsNew files (19)
/execution/[taskId]+ landing/executionuseExecutionMonitor(rAF-batched state accumulation overuseTaskStream)eventStyles.ts(agent state derivation, badge styles, icons)Modified files (5)
TaskBoardView.tsx: execute handlers now navigate to/executionAppSidebar.tsx: Execution nav link enabledapi.ts: addedblockersApi,batchesApi,tasksApi.stopExecution/resumeExecutiontypes/index.ts: addedBlocker,BatchResponse,UIAgentStatetypes@hugeicons/reactmock: added execution monitor iconsCode review findings addressed
await onStop()so loading spinner is visibleblockedcompletion →BLOCKEDstate (notFAILED)aria-labelon blocker textareaaria-expandedon collapsible sidebar togglearia-expandedon batch accordion rowsTest plan
npx tsc --noEmitpasses (only pre-existing test file errors)npx next buildsucceeds (all pages generate)npx jest— 253 tests passing, 24 suites/execution/[taskId]/execution?batch=<id>with accordion rowsCloses #332
Summary by CodeRabbit
New Features
API
Tests
Chores
Dependencies